Don't continue running Monitor collectors if we're already timed out#4706
Don't continue running Monitor collectors if we're already timed out#4706
Conversation
There was a problem hiding this comment.
Pull request overview
This PR aims to stop running additional cluster monitor collectors once the monitoring context has timed out/canceled, and adds unit coverage for the skip behavior.
Changes:
- Add a
parallelismfield toMonitorto control collector goroutine concurrency (defaulting toMONITOR_GOROUTINES_PER_CLUSTER). - Short-circuit
timeCallwhenctx.Err() != nil, emitting a newmonitor.cluster.collector.skippedgauge and returning a wrapped error. - Extend
cluster_test.goto cover the “timeout/cancel during collector causes subsequent collectors to be skipped” scenario.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| pkg/monitor/cluster/cluster.go | Adds collector-skipping on context cancellation, introduces configurable parallelism, and emits a new “skipped collector” metric. |
| pkg/monitor/cluster/cluster_test.go | Adds a unit test case asserting collectors are skipped after cancellation and adjusts context usage and monitor parallelism for determinism. |
Comments suppressed due to low confidence (1)
pkg/monitor/cluster/cluster.go:306
- This change skips work inside timeCall when ctx is done, but Monitor() still calls wg.Go for every collector. With SetLimit, this can still end up spawning new goroutines after the context has been canceled (they just return quickly). If the intent is to stop spawning new goroutines once timed out, add a ctx.Err() check in the collectors loop before calling wg.Go and handle remaining collectors without starting goroutines (optionally still emitting the skipped metric / returning a skip error for each).
// Run up to mon.parallelism (default: MONITOR_GOROUTINES_PER_CLUSTER)
// goroutines for collecting metrics
wg := new(errgroup.Group)
wg.SetLimit(mon.parallelism)
// Create a channel capable of buffering one error from every collector
errChan := make(chan error, len(mon.collectors))
for _, f := range mon.collectors {
wg.Go(func() error {
innerErr := mon.timeCall(ctx, f)
if innerErr != nil {
// NOTE: The channel only has room to accommodate one error per
// collector, so if a collector needs to return multiple errors
// they should be joined into a single one (see errors.Join)
// before being added.
errChan <- innerErr
}
return nil
})
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…mit metrics when we have timed out
e75e698 to
e1b03b6
Compare
| "github.com/Azure/ARO-RP/pkg/util/version" | ||
| ) | ||
|
|
||
| const MONITOR_GOROUTINES_PER_CLUSTER = 5 |
There was a problem hiding this comment.
should this become a dynamic field based on GOMAXPROCS?
Something like: GOMAXPROCS / num. of clusters we want to check in parallel?
| // metrics | ||
| // Run up to mon.parallelism (default: MONITOR_GOROUTINES_PER_CLUSTER) | ||
| // goroutines for collecting metrics | ||
| wg := new(errgroup.Group) |
There was a problem hiding this comment.
I find this misleading, errorgroup and waitgroup are different things that should probably named and managed differently, WDYT?
Also I have the impression that here we're losing the critical opportunity the pass down the current context to the spawned goroutines to have a safe and deterministic behavior when something happens to the parent context or to ensure we don't leave orphans around the globe.
The way I would address this, could be similar to what I've used in a previous commit by using errgroup.WithContext(ctx)
| errChan := make(chan error, len(mon.collectors)) | ||
|
|
||
| for _, f := range mon.collectors { | ||
| wg.Go(func() error { |
There was a problem hiding this comment.
if we prefer to have a faster non-blocking design,
we could replace wg.Go with wg.TryGo()
If the limit set by SetLimit is reached, TryGo returns false immediately instead of blocking the caller.
Using them together allows you to handle overload gracefully rather than just pausing (which is what SetLimit + Go does).
Otherwise it is fine as it is now, maybe adding
if ctx.Err() != nil {
break
}
wg.Go(func() error {
...There was a problem hiding this comment.
That's intentional -- I do want this to block, so it keeps feeding new ones in as it gets unblocked by the earlier ones completing.
|
|
||
| type collectorFunc func(context.Context) error | ||
|
|
||
| type Monitor struct { |
There was a problem hiding this comment.
Possible root cause
There are 27 collectors running with a concurrency limit of 5 (MONITOR_GOROUTINES_PER_CLUSTER).
Each workOne call creates a fresh cluster.Monitor struct which allocates:
┌─────────────────────────────────────────────────────┬──────────┐
│ Component │ Estimate │
├─────────────────────────────────────────────────────┼──────────┤
│ Goroutine stacks (1 execute + up to 5 collectors × │ ~48 KB │
│ 8KB) │ │
├─────────────────────────────────────────────────────┼──────────┤
│ Monitor struct + 5 K8s clientsets (kubernetes, │ ~100–200 │
│ config, operator, aro, OPC) + shared HTTP transport │ KB │
│ + TLS session cache + REST mapper │ │
├─────────────────────────────────────────────────────┼──────────┤
│ In-flight HTTP response buffers (list pods, nodes, │ ~50–100 │
│ machines…) │ KB │
├─────────────────────────────────────────────────────┼──────────┤
│ Total per leaked cycle │ ~200–350 │
│ │ KB │
└─────────────────────────────────────────────────────┴──────────┘
Fleet-level scaling
The monitor runs every 1 minute per cluster. With a 50s timeout:
┌──────────────────┬────────────┬───────────────┬──────────────┐
│ Scenario │ Clusters │ Goroutine │ Leaked │
│ │ timing out │ overlap │ memory │
├──────────────────┼────────────┼───────────────┼──────────────┤
│ Normal (5s │ │ │ │
│ cleanup after │ 20 of 200 │ 5s / 60s │ ~0.5 MB │
│ cancel) │ │ │ │
├──────────────────┼────────────┼───────────────┼──────────────┤
│ Stress (30s │ │ │ │
│ cleanup, slow │ 20 of 200 │ 30s / 60s │ ~3 MB │
│ TCP RST) │ │ │ │
├──────────────────┼────────────┼───────────────┼──────────────┤
│ Worst case │ │ Compounding │ ~6 MB/min, │
│ (goroutines │ 20 of 200 │ each minute │ ~60 MB over │
│ truly stuck) │ │ │ 10 min │
└──────────────────┴────────────┴───────────────┴──────────────┘
The goroutines themselves aren't the memory hog — it's the entire
Monitor struct tree (K8s clientsets, HTTP connection pools, TLS
state) that remains rooted in memory through the closure references
in wg.Go() until every collector goroutine returns.
The deeper issue is in worker.go:257-264:
go execute(ctx, log, allJobsDone, monitors, onPanic)
select {
case <-allJobsDone:
case <-ctx.Done(): // workOne returns here, but execute is STILL RUNNING
}
When ctx.Done() wins the select, workOne returns — but the execute
goroutine (holding the WaitGroup, the collector goroutines, and the
entire Monitor struct via closures) stays alive until all collectors
finish. The next monitoring cycle creates a new Monitor with new K8s
clients while the old one hasn't been GC'd yet.
PR helps by making queued-but-not-yet-started collectors return
instantly. But the 5 currently running collectors (doing K8s API
calls) still linger, keeping the Monitor alive.
Other findings worth a follow-up
The one item worth attention beyond the monitor:
- worker.go:187 uses context.Background() for workOne, meaning the
WithTimeout(ctx, 50*time.Second) inside workOne is the only
cancellation mechanism. If the parent worker goroutine stops (via
stop channel), in-flight workOne calls are not notified — they run to
completion or timeout independently.
There was a problem hiding this comment.
Opened a follow-up PR to fix the goroutine leak identified here: #4714
TL;DR: allJobsDone is an unbuffered channel. When the 50s timeout fires and workOne returns, the execute goroutine blocks forever on done <- true (no receiver). Fix: make(chan bool) → make(chan bool, 1).
What this PR does / why we need it:
If we've timed out, stop spawning new goroutines.
Test plan for issue:
Unit tests
Is there any documentation that needs to be updated for this PR?
N/A
How do you know this will function as expected in production?
Testing